-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add abiltity to configure error output #668
Merged
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ekneg54
force-pushed
the
dev-implement-error-output
branch
3 times, most recently
from
September 20, 2024 12:42
ae1d775
to
757ef5a
Compare
ekneg54
force-pushed
the
dev-implement-error-output
branch
from
September 26, 2024 17:33
68d2cdb
to
69446fc
Compare
This was referenced Sep 26, 2024
Merged
Merged
ekneg54
force-pushed
the
dev-implement-error-output
branch
2 times, most recently
from
October 2, 2024 12:36
1ff01f5
to
2f1aab3
Compare
ekneg54
force-pushed
the
dev-implement-error-output
branch
5 times, most recently
from
October 7, 2024 11:06
1238998
to
895c4cd
Compare
additional work:
|
remove store_failed
add a componentqueuelistener to handle errors from queue into output connector
fix pipeline_manager tests
add tests for componentqueuelistener add more tests
remove double property WIP
bump test coverage for pipeline_manager to 100 percent fix most acceptance tests by adding error_output add more tests start fixing pipeline.py tests
add basic tests for pipeline_result add tests for pipeline_result
dtrai2
requested changes
Oct 23, 2024
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more considerations:
- please check the rst configuration references for the documentation, currently the error_output is missing
-
tests/unit/charts/test_output_config.py
has aerror_index
inside the opensearch configuration, which should be removed - consider if closing the component queue listener queue before draining (prevents adding new events from other processes)
- please check the test coverage
- Renamed multiple test functions for clarity and consistency. - Updated logging messages to provide better context for errors. - Improved documentation links and descriptions in YAML and Markdown files. - Fix method signatures
- Consolidate OutputQueueListener to use multiprocessing exclusively. - Remove threading implementation and related configurations. - Update tests and documentation to reflect these changes.
- Updated method name for improved clarity and consistency. - Adjusted related tests and function calls accordingly. - Enhanced documentation within the new method.
- Removed redundant case clauses for `CriticalOutputError` handling. - Updated unit tests to cover new error handling logic.
- Ensure test does not hang if error output file is not created. - Timeout set to 10 seconds to prevent indefinite waiting periods.
- Ensure volume mounts do not include error-output-config - Check command string does not reference error-output-config.yaml
- InvalidConfigurationError receives an unspecified amount of arguments that couldn't be successfully forwarded to the LogprepException
- Added a new test to ensure logging of errors when error output itself encounters an error.
- ignore the 1 that is added to the error_queque for process synchronization reasons
- adjust `wait_for_output` to exclude specific forbidden outputs - add comment to clarify purpose of exclusion
- and as it can't be reached it also can't be tested
- Introduced tests for `listen` and `drain_queue` methods. - Verified logging of unexpected exceptions during queue processing. - Ensured specific items and sentinel values are ignored during queue operations. - Increases test coverage
- test is working locally but runs forever in ci pipeline
This commit refactors the ConfluentKafkaInput class to store offsets for the last message referenced by `_last_valid_records`. Previously, offsets were stored for each kafka partition in `_last_valid_records`, but now only the last valid record is stored. This change improves the efficiency of offset storage and reduces memory usage. Code changes: - Modified `ConfluentKafkaInput` class in `logprep/connector/confluent_kafka/input.py` - Removed `_last_valid_records` dictionary and replaced it with `_last_valid_record` variable - Updated `batch_finished_callback` method to store offsets for the last valid record
dtrai2
approved these changes
Oct 29, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This changes the error handling implementation of logprep.
This PR aims at two main goals.
This is handled by not raising FatalOutputError or FatalInputError but instead raise a CriticalInputError or a CriticalOutputError.
No error is handled in the pipeline process anymore all error causing events are written to error output.
To achieve this, I had to change the batch_finished_callback mechanic. As now every event gotten from input via get_next is committed to kafka utilizing the batch_finished_callback mechanic in the pipeline.py. no connections between intput and output connectors anymore.
to make it simple. This PR has to ensure, that every event goes into output, error output or gets logged to console as last resort.
Every error raising event will be serialized together with its raising error to an error event and is put into a multiprocessing.Queue (ThrottlingQueue). In the main Thread these events were handled in a configured error output connector which indeed can be any output connector implemented in logprep.
To achieve theses goals I had to reimplement the opensearch output connector to simplify things a lot.
Please have a look on my changes and lets discuss. Feel free to give feedback and to ask your questions.
It is a very big PR. Sorry for that but the cut was a fundamental one.